Amazon GameLift In C# 08: より良いチャットサーバーを作ります(Part1)
概要
前回作ったチャットソフトは、メッセージ送受信の検証として、一回通信しかできなかったので、これから継続的に、複数のユーザーでチャットできるチャットソフトを作ります。
説明したいことが比較的に多くて、ソースも少し長いので、サーバープログラムの解説はChatServer.cs
とConncetedClient.cs
を中心として、二つ記事に分けて説明いたします。
マシン環境
CPU : Intel i7-6920HQ
GPU : AMD Radeon Pro 460
Memory : 16GB
System : Windows 10 (with .NET 5 installed)
IDE : Visual Studio 2019 Community
Editor : Visual Studio Code
Terminal : Windows Terminal (or Command Prompt)
ファイルの構造とクラスの運用
前回作ったチャットソフトのソースコードは一つファイルにまとめてテストしたが、今回のソースは比較的に長いので、管理しやすくするため、複数のファイルに分けました。
プロジェクトはGitHubにアップロードしています : AGLW-CSharp-BetterChatServerSample
ソースコードファイル :
- Program.cs (プログラムの入口)
-
GameLiftServer.cs (Amazon GameLiftとの接続、そして
ChatServer
を管理するクラス) -
ChatServer.cs (クライアント
ConnectedClient
との接続、通信管理クラス) -
ConnectedClient.cs (クライアントの接続情報と
NetworkStream
での受送信管理クラス)
メッセージ管理、受送信のコンテナはConcurrentQueue
(FIFO)使うので、順序的なメッセージの管理がしやすいです。そしてAction
というDelegate
を使って、クライアントに一斉送信することが楽です。
Program.cs
とGameLiftServer.cs
の中身はそこまで変わらないので、説明は省略します。
ChatServer.cs ソースコード
ChatServer.cs
ソースコードを貼り付けます。
using System; using System.Collections.Generic; using System.Threading; using System.Net; using System.Net.Sockets; using System.Collections.Concurrent; using Aws.GameLift.Server; using Aws.GameLift.Server.Model; namespace AGLW_CSharp_BetterChatServerSample { class ChatServer { public static int MessageLength = 256; // 256bytes public static int SleepDuration = 100; // 100ms // A UTF-8 encoder to process byte[] <-> string conversion public readonly System.Text.Encoding Encoder = System.Text.Encoding.UTF8; public GameSession ManagedGameSession { get; private set; } = null; // TCP lisenter has it's own thread private TcpListener listener = null; private Thread listenerThread = null; private Thread senderThread = null; private Thread retrieverThread = null; private ConcurrentQueue<byte[]> messagePool = null; private event Action<byte[]> sendMsgDelegate; List<ConnectedClient> clientPool = null; List<ConnectedClient> clientsInQueue = null; public ChatServer(GameSession session) { ManagedGameSession = session; messagePool = new ConcurrentQueue<byte[]>(); clientPool = new List<ConnectedClient>(); clientsInQueue = new List<ConnectedClient>(); StartServer(); } internal void UpdateGameSession(GameSession gameSession) { ManagedGameSession = gameSession; } public void StartServer() { // Create a TCP listener(in a listener thread) from port when when ProcessReady() returns success LaunchListenerThread(ManagedGameSession.Port); LaunchSenderThread(); LaunchRetrieverThread(); } // A method creates thread for listener void LaunchListenerThread(int port) { listenerThread = new Thread(() => { Listen(port); }); listenerThread.Start(); Console.WriteLine($"Server : Listener thread is created and started"); } void LaunchSenderThread() { senderThread = new Thread(() => SendToAllClients()); senderThread.Start(); } private void LaunchRetrieverThread() { retrieverThread = new Thread(() => RetrieveFromAllClients()); retrieverThread.Start(); } // A method listens the port. // When client connects : // 1) Create ConnectedClient instance from TcpClient -> 2) Register SendMessage() to delegate -> 3) Add client to clientsInQueue void Listen(int port) { listener = TcpListener.Create(ManagedGameSession.Port); listener.Start(); Console.WriteLine($"Server : Start listening port {port}"); while (true) { // TcpClient.AccecptTcpClient() blocks TcpClient client = listener.AcceptTcpClient(); ConnectedClient c = new ConnectedClient(client); sendMsgDelegate += c.SendMessage; c.StartClient(); clientsInQueue.Add(c); } } private void SendToAllClients() { while (true) { SleepForAWhile(); if (messagePool.Count > 0) { byte[] bytes = new byte[MessageLength]; if (messagePool.TryDequeue(out bytes)) { Console.WriteLine($"Message sent: {Encoder.GetString(bytes)}"); sendMsgDelegate(bytes); } } } } private void RetrieveFromAllClients() { while (true) { if (clientsInQueue.Count > 0) { clientPool.AddRange(clientsInQueue); clientsInQueue.Clear(); } SleepForAWhile(); List<ConnectedClient> disconnectedClients = new List<ConnectedClient>(); byte[] bytes = new byte[MessageLength]; foreach (var c in clientPool) { if (c != null && c.TargetClient.Connected) { if (c.RetrieveMessage(out bytes)) { messagePool.Enqueue(bytes); } } else { disconnectedClients.Add(c); } } // Release disconnected client object foreach (var dc in disconnectedClients) { sendMsgDelegate -= dc.SendMessage; clientPool.Remove(dc); } } } private void SleepForAWhile() { Thread.Sleep(SleepDuration); } } }
ChatServer.cs メンバー変数の解説
public static int MessageLength = 256; // 256bytes public static int SleepDuration = 100; // 100ms // A UTF-8 encoder to process byte[] <-> string conversion public readonly System.Text.Encoding Encoder = System.Text.Encoding.UTF8; public GameSession ManagedGameSession { get; private set; } = null; // TCP lisenter has it's own thread private TcpListener listener = null; private Thread listenerThread = null; private Thread senderThread = null; private Thread retrieverThread = null; private ConcurrentQueue<byte[]> messagePool = null; private event Action<byte[]> sendMsgDelegate; List<ConnectedClient> clientPool = null; List<ConnectedClient> clientsInQueue = null;
MessageLength
/SleepDuration
メッセージの長さは256bytes
に決まっていて、そしてサーバー負荷を軽減するため、100ms
(0.1秒)という睡眠時間をつけます。-
ManagedGameSession
GameLiftServer.cs
のOnStartGameSession
とUpdateGameSession
のタイミングでGameSession
を保存するメンバー変数です。 -
TcpListener :
listener
クライアントの接続動作を監視するメンバーです。 -
Thread :
listenerThread
/senderThread
/retrieverThread
プログラムのmain thread
以外は三つThread
があります。
listenerThread
は接続を監視するThread
、senderThread
はメッセージの一斉送信するThread
、そしてretrieverThread
は各クライアントからのメッセージを受信するThread
です。 -
ConcurrentQueue<byte[]> :
messagePool
First In First Out(FIFO)というデータ構造のコンテナーです。順序的に受信したメッセージはここに保存し、送信するメッセージはここから取り出します。 -
event Action<byte[]> :
sendMsgDelegate
クライアントが接続した後、SendMessage()
という関数がこのDelegate(Action)
に登録し、メッセージがあったら、一斉送信されます。 -
List
clientPool
/clientsInQueue
クライアントを管理するリストです。Multi-Thread
の処理には揉め事があるため、clientPool
に関わる処理が終るまで、新しい接続できたクライアントはclientsInQueue
に入れて、後でclientPool
に追加します。
ChatServer.cs 関数の解説
// A method listens the port. // When client connects : // 1) Create ConnectedClient instance from TcpClient -> 2) Register SendMessage() to delegate -> 3) Add client to clientsInQueue void Listen(int port) { listener = TcpListener.Create(ManagedGameSession.Port); listener.Start(); Console.WriteLine($"Server : Start listening port {port}"); while (true) { // TcpClient.AccecptTcpClient() blocks TcpClient client = listener.AcceptTcpClient(); ConnectedClient c = new ConnectedClient(client); sendMsgDelegate += c.SendMessage; c.StartClient(); clientsInQueue.Add(c); } }
Listen(int port)
関数は前回と少し変わりました。
接続できたTcpClient
のインスタンスでConnectedClient
を作って、そしてSentMessage
関数をsendMsgDelegate
に登録します。
ConnectedClient
のインスタンス自体はStartClient()
で作業開始し、clientsInQueue
に追加され、他の関数でclientPool
に追加待ち。
private void SleepForAWhile() { Thread.Sleep(SleepDuration); }
Thread
を睡眠させる操作をこのSleepForAWhile()
関数に統一させます。
private void LaunchRetrieverThread() { retrieverThread = new Thread(() => RetrieveFromAllClients()); retrieverThread.Start(); } private void RetrieveFromAllClients() { while (true) { if (clientsInQueue.Count > 0) { clientPool.AddRange(clientsInQueue); clientsInQueue.Clear(); } SleepForAWhile(); List<ConnectedClient> disconnectedClients = new List<ConnectedClient>(); byte[] bytes = new byte[MessageLength]; foreach (var c in clientPool) { if (c != null && c.TargetClient.Connected) { if (c.RetrieveMessage(out bytes)) { messagePool.Enqueue(bytes); } } else { disconnectedClients.Add(c); } } // Release disconnected client object foreach (var dc in disconnectedClients) { sendMsgDelegate -= dc.SendMessage; clientPool.Remove(dc); } } }
LaunchRetrieverThread()
でRetrieveFromAllClients()
を実行するThread
を起動します。
if (clientsInQueue.Count > 0) { clientPool.AddRange(clientsInQueue); clientsInQueue.Clear(); }
メッセージをまとめる前に、隊列に待っているクライアントを確認し、clientPool
に追加する。
List<ConnectedClient> disconnectedClients = new List<ConnectedClient>(); byte[] bytes = new byte[MessageLength]; foreach (var c in clientPool) { if (c != null && c.TargetClient.Connected) { if (c.RetrieveMessage(out bytes)) { messagePool.Enqueue(bytes); } } else { disconnectedClients.Add(c); } }
ループですべてのクライアントからメッセージを保存し、切断されるクライアントもdisconnectedClients
に記録します。
// Release disconnected client object foreach (var dc in disconnectedClients) { sendMsgDelegate -= dc.SendMessage; clientPool.Remove(dc); }
切断したクライアントのSendMessage
関数を登録解除し、clientPool
から削除します。
void LaunchSenderThread() { senderThread = new Thread(() => SendToAllClients()); senderThread.Start(); } private void SendToAllClients() { while (true) { SleepForAWhile(); if (messagePool.Count > 0) { byte[] bytes = new byte[MessageLength]; if (messagePool.TryDequeue(out bytes)) { Console.WriteLine($"Message sent: {Encoder.GetString(bytes)}"); sendMsgDelegate(bytes); } } } }
LaunchSenderThread()
でSendToAllClients()
を実行するThread
を起動します。
メッセージがあったら、messagePool.TryDequeue(out bytes)
でメッセージを取り出して、sendMsgDelegate()
でクライアントに一斉送信します。
これでChatServer.cs
の説明が完了いたします。
最後
クライアントとの通信全体はChatServer.cs
で管理していますが、細かいメッセージ処理はConnectedClient.cs
でやっています。
ConnectedClient.cs
は次回の記事で解説します。